﻿#include <QtConcurrent/QtConcurrent>
#include "zmqsubscriber.h"

ZmqSubscriber::ZmqSubscriber(QObject *parent) : BaseSubscriber(parent)
{

}

bool ZmqSubscriber::connectToHost()
{
    bool isOk=initSUBModuleConnect();
    if(isOk)
    {
        QtConcurrent::run([this]()
        {
            run();
        });
    }
    return isOk;
}

bool ZmqSubscriber::isConnected()
{
    return true;
}

bool ZmqSubscriber::disConnect()
{
    isRun=false;
    zmq_ctx_destroy(_context);
    zmq_close(_socket);
    return true;
}

bool ZmqSubscriber::initSUBModuleConnect()
{
    _context = zmq_ctx_new();
    _socket = zmq_socket(_context, ZMQ_SUB);
    int64_t max_msg_size = 16 * 1024 * 1024;  // 设置为 16 MB
    zmq_setsockopt(_socket,ZMQ_MAXMSGSIZE,&max_msg_size,sizeof(max_msg_size));

    QString commParam=getCommParam();
    //"tcp://localhost:5556"
    if(zmq_connect(_socket, commParam.toUtf8()) < 0) {
         qDebug()<<"Failed to open the socket "<<commParam;
         return false;
    }
    //重要：设置订阅主题，否则接收不到数据
    if(zmq_setsockopt(_socket, ZMQ_SUBSCRIBE, "", 0) < 0) {
        qDebug()<<"Failed to setsocketopt ZMQ_SUBSCRIBE error";
        return false;
    }

    int mstimeout = 3000;
    if(zmq_setsockopt(_socket, ZMQ_RCVTIMEO, &mstimeout, sizeof(int)) < 0) {
        qDebug()<<"Failed to setsocketopt ZMQ_RCVTIMEO error";
        return false;
    }

    //设置保活机制
    int tcp_keep_alive = 1;
    if(zmq_setsockopt(_socket, ZMQ_TCP_KEEPALIVE, &tcp_keep_alive, sizeof(tcp_keep_alive)) < 0) {
        qDebug()<<"Failed to setsocketopt ZMQ_TCP_KEEPALIVE error";
        return false;
    }
    return true;
}

void ZmqSubscriber::run()
{
    isRun = true;
    int RECEIVE_MSG_MAX_LENGTH=16*1024*1024;
    unsigned char *receivetMsg = new unsigned char[RECEIVE_MSG_MAX_LENGTH];
    QElapsedTimer waittime;
    waittime.start();
    while(isRun)
    {
        QThread::msleep(50);
        if(_socket)
        {
            int len = zmq_recv(_socket, receivetMsg, RECEIVE_MSG_MAX_LENGTH * sizeof(unsigned char),0);
            if(len < 0) {
               QThread::msleep(500);
            } else {
                QByteArray data((char*)(receivetMsg), len);
                emit receive(data);
                waittime.start();
            }

            if(waittime.elapsed() > 1000*60*5) {//超过5分钟没有数据，则重新连接
                qDebug()<<__FUNCTION__<<__LINE__<<"no data arrive for longtime!";
                waittime.restart();
                initSUBModuleConnect();    //测试是否是导致数据冗余原因，暂时注释
            }
        }
    }
    delete [] receivetMsg;
}
